Skip to content

add: multi Topic support#10

Open
GefMar wants to merge 3 commits intomainfrom
MultyTopicBrocker
Open

add: multi Topic support#10
GefMar wants to merge 3 commits intomainfrom
MultyTopicBrocker

Conversation

@GefMar
Copy link
Copy Markdown
Contributor

@GefMar GefMar commented Apr 26, 2026

Summary

Adds multi-topic support to AioKafkaBroker while keeping the existing single-topic behavior fully backward compatible.

Why

Some projects need different task groups to use different Kafka topics, while still running through the same taskiq broker and worker setup. For example, regular tasks, email tasks, reports, or priority-specific workloads may need separate Kafka topics for routing, isolation, monitoring, or operational control.

Previously, AioKafkaBroker always sent every task to one broker-level Kafka topic. This change allows each task to define its own default topic, and also allows overriding the topic for a single kiq call.

What Changed

  • Added TopicConfig and Topic helper structures.
  • Extended AioKafkaBroker with multi-topic support:
    • kafka_topic remains the default topic.
    • kafka_topics can be used to configure additional topics for the worker to listen to.
    • @broker.task(topic="...") sets a default Kafka topic for that task.
    • task.kicker().with_topic("...").kiq(...) overrides the topic for a single task kick.
  • Updated startup so worker consumers subscribe to all configured topics.
  • Updated shutdown topic cleanup to handle all configured topics when delete_topic_on_shutdown=True.
  • Preserved backward compatibility for existing AioKafkaBroker usage.
  • Added tests for:
    • task-level default topic routing;
    • per-kick topic override;
    • fallback to broker default topic;
    • consumer subscription to all topics.
  • Updated README with multi-topic usage examples.

@GefMar GefMar requested review from danfimov and s3rius April 26, 2026 19:16
Copy link
Copy Markdown
Member

@s3rius s3rius left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the new implementation. We need to not forget to bump major version so people won't get this update by accident. Also I highlighted few places. Which could be updated, but it's not something critical.

Comment thread taskiq_aio_kafka/broker.py Outdated
Comment thread taskiq_aio_kafka/broker.py Outdated
class AioKafkaKicker(AsyncKicker[_FuncParams, _ReturnType]):
"""Kicker that can override kafka topic for a task call."""

def with_topic(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem with this method is that broker doesn't carry information about the decorator_class field.

Although it will definitely work, but pyright will never suggest you this method, and mypy will always be complaining about using it.

I think this particular issue worth thinking through more carefully. I guess extending kicker functionality for different brokers is one of those things that could actually be helpful for lots of places, but current implementation is a total lame fr. Maybe we could update this type information internally somehow without affecting other brokers allowing users and lib devs to extend kicker functionality while getting all they type-hinting benefits.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interface Changed

@GefMar GefMar requested a review from s3rius April 28, 2026 09:11
GefMar added 2 commits April 28, 2026 16:10
update: task_with_topic interface
update: tests
@GefMar GefMar force-pushed the MultyTopicBrocker branch from 20fbd5e to 32d4dc5 Compare April 28, 2026 14:11
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants